/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.distributed.internal.locks;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * A processor for telling the old grantor that it is deposed by a new grantor. Processor waits for
 * ack before completing.
 *
 * @since GemFire 4.0 (renamed from ExpectTransferProcessor)
 */
public class DeposeGrantorProcessor extends ReplyProcessor21 {

  private static final Logger logger = LogService.getLogger();

  ////////// Public static entry point /////////


  /**
   * Send a message to oldGrantor telling it that it is deposed by newGrantor. Send does not
   * complete until an ack is received or the oldGrantor leaves the system.
   */
  static void send(String serviceName, InternalDistributedMember oldGrantor,
      InternalDistributedMember newGrantor, long newGrantorVersion, int newGrantorSerialNumber,
      DistributionManager dm) {
    final InternalDistributedMember elder = dm.getId();
    if (elder.equals(oldGrantor)) {
      doOldGrantorWork(serviceName, elder, newGrantor, newGrantorVersion, newGrantorSerialNumber,
          dm, null);
    } else {
      DeposeGrantorProcessor processor = new DeposeGrantorProcessor(dm, oldGrantor);
      DeposeGrantorMessage.send(serviceName, oldGrantor, newGrantor, newGrantorVersion,
          newGrantorSerialNumber, dm, processor);
      try {
        processor.waitForRepliesUninterruptibly();
      } catch (ReplyException e) {
        e.handleCause();
      }
    }
  }

  protected static void doOldGrantorWork(final String serviceName,
      final InternalDistributedMember elder, final InternalDistributedMember youngTurk,
      final long newGrantorVersion, final int newGrantorSerialNumber, final DistributionManager dm,
      final DeposeGrantorMessage msg) {
    try {
      DLockService svc = DLockService.getInternalServiceNamed(serviceName);
      if (svc != null) {
        LockGrantorId newLockGrantorId =
            new LockGrantorId(dm, youngTurk, newGrantorVersion, newGrantorSerialNumber);
        svc.deposeOlderLockGrantorId(newLockGrantorId);
      }
    } finally {
      if (msg != null) {
        msg.reply(dm);
      }
    }
  }

  //////////// Instance methods //////////////

  /**
   * Creates a new instance of DeposeGrantorProcessor
   */
  private DeposeGrantorProcessor(DistributionManager dm, InternalDistributedMember oldGrantor) {
    super(dm, oldGrantor);
  }


  /////////////// Inner message classes //////////////////

  public static class DeposeGrantorMessage extends PooledDistributionMessage
      implements MessageWithReply {
    private int processorId;
    private String serviceName;
    private InternalDistributedMember newGrantor;
    private long newGrantorVersion;
    private int newGrantorSerialNumber;

    protected static void send(String serviceName, InternalDistributedMember oldGrantor,
        InternalDistributedMember newGrantor, long newGrantorVersion, int newGrantorSerialNumber,
        DistributionManager dm, ReplyProcessor21 proc) {
      DeposeGrantorMessage msg = new DeposeGrantorMessage();
      msg.serviceName = serviceName;
      msg.newGrantor = newGrantor;
      msg.newGrantorVersion = newGrantorVersion;
      msg.newGrantorSerialNumber = newGrantorSerialNumber;
      msg.processorId = proc.getProcessorId();
      msg.setRecipient(oldGrantor);
      if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
        logger.trace(LogMarker.DLS_VERBOSE, "DeposeGrantorMessage sending {} to {}", msg,
            oldGrantor);
      }
      dm.putOutgoing(msg);
    }

    @Override
    public int getProcessorId() {
      return this.processorId;
    }

    void reply(DistributionManager dm) {
      ReplyMessage.send(this.getSender(), this.getProcessorId(), null, dm);
    }

    @Override
    protected void process(final ClusterDistributionManager dm) {
      // if we are currently the grantor then
      // mark it as being destroyed until we hear from this.newGrantor
      // or it goes away or the grantor that sent us this message goes away.

      InternalDistributedMember elder = this.getSender();
      InternalDistributedMember youngTurk = this.newGrantor;

      doOldGrantorWork(this.serviceName, elder, youngTurk, this.newGrantorVersion,
          this.newGrantorSerialNumber, dm, this);
    }

    @Override
    public int getDSFID() {
      return DEPOSE_GRANTOR_MESSAGE;
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      super.fromData(in, context);
      this.processorId = in.readInt();
      this.serviceName = DataSerializer.readString(in);
      this.newGrantor = (InternalDistributedMember) DataSerializer.readObject(in);
      this.newGrantorVersion = in.readLong();
      this.newGrantorSerialNumber = in.readInt();
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {
      super.toData(out, context);
      out.writeInt(this.processorId);
      DataSerializer.writeString(this.serviceName, out);
      DataSerializer.writeObject(this.newGrantor, out);
      out.writeLong(this.newGrantorVersion);
      out.writeInt(this.newGrantorSerialNumber);
    }

    @Override
    public String toString() {
      StringBuffer buff = new StringBuffer();
      buff.append("DeposeGrantorMessage (serviceName='").append(this.serviceName)
          .append("' processorId=").append(this.processorId).append(" newGrantor=")
          .append(this.newGrantor).append(" newGrantorVersion=").append(this.newGrantorVersion)
          .append(" newGrantorSerialNumber=").append(this.newGrantorSerialNumber).append(")");
      return buff.toString();
    }
  }
}
